{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div style=\"float: right; margin: 20px 20px 20px 20px\"><img src=\"images/bro.png\" width=\"130px\"></div>\n",
    "\n",
    "# Zeek to Kafka\n",
    "This notebook covers how to stream Zeek data using Kafka as a message queue. The setup takes a bit of work but the result will be robust way to stream data from Zeek.\n",
    "\n",
    "<div style=\"float: right; margin: 20px 20px 20px 20px\"><img src=\"images/kafka.png\" width=\"180px\"></div>\n",
    "\n",
    "### Software\n",
    "- Zeek Network Monitor: https://www.zeek.org\n",
    "- Kafka Zeek Plugin: https://github.com/apache/metron-bro-plugin-kafka\n",
    "- Kafka: https://kafka.apache.org"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Part 1: Streaming data pipeline\n",
    "To set some context, our long term plan is to build out a streaming data pipeline. This notebook will help you get started on this path. After completing this notebook you can look at the next steps by viewing our notebooks that use Spark on Zeek output.\n",
    " - [Zeek to Spark](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/master/notebooks/Zeek_to_Spark.ipynb)\n",
    " - [Zeek to Kafka to Spark](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/master/notebooks/Zeek_to_Kafka_to_Spark.ipynb)\n",
    "\n",
    "So our streaming pipeline looks conceptually like this.\n",
    "<div style=\"margin: 20px 20px 20px 20px\"><img src=\"images/pipeline.png\" width=\"750px\"></div>\n",
    "\n",
    "- **Kafka Plugin for Zeek**\n",
    "- **Publish (provides a nice decoupled architecture)**\n",
    "- **Pull/Subscribe to whatever feed you want (http, dns, conn, x509...)**\n",
    "- ETL (Extract Transform Load) on the raw message data (parsed data with types)\n",
    "- Perform Filtering/Aggregation\n",
    "- Data Analysis and Machine Learning"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "<div style=\"float: right; margin: 20px 0px 0px 0px\"><img src=\"images/confused.jpg\" width=\"300px\"></div>\n",
    "\n",
    "# Getting Everything Setup\n",
    "Things you'll need:\n",
    "- A running Zeek network security monitor: https://docs.zeek.org/en/stable/install/install.html\n",
    "- The Kafka Plugin for Zeek: https://github.com/apache/metron-bro-plugin-kafka\n",
    "- A Kafka Broker: https://kafka.apache.org\n",
    "\n",
    "The weblinks above do a pretty good job of getting you setup with Zeek, Kafka, and the Kafka plugin. If you already have these thing setup then you're good to go. If not take some time and get both up and running. If you're a bit wacky (like me) and want to set these thing up on a Mac you might check out my notes here [Zeek/Kafka Mac Setup](https://github.com/SuperCowPowers/zat/blob/master/docs/zeek_kafka_mac.md)\n",
    "\n",
    "## Systems Check\n",
    "Okay now that Zeek with the Kafka Plugin is setup, lets do just a bit of testing to make sure it's all AOK before we get into making a Kafka consumer in Python.\n",
    "\n",
    "**Test the Zeek Kafka Plugin**\n",
    "\n",
    "Make sure the Kafka plugin is ready to go by running the follow command on your Zeek instance:\n",
    "\n",
    "```\n",
    "$ zeek -N Apache::Kafka\n",
    "Apache::Kafka - Writes logs to Kafka (dynamic, version 0.3.0)\n",
    "```\n",
    "\n",
    "**Activate the Kafka Plugin**\n",
    "\n",
    "There's a good explanation of all the options here (<https://github.com/apache/metron-bro-plugin-kafka>). In my case\n",
    "I needed to put a different load command when 'activating' the Kafka plugin in my local.zeek configuration file. Here's what I added to the 'standard' site/local.zeek file.\n",
    "\n",
    "```\n",
    "@load Apache/Kafka\n",
    "redef Kafka::topic_name = \"\";\n",
    "redef Kafka::send_all_active_logs = T;\n",
    "redef Kafka::kafka_conf = table(\n",
    "    [\"metadata.broker.list\"] = \"localhost:9092\"\n",
    ");\n",
    "```\n",
    "- The first line took me a while to figure out\n",
    "- The rest is, at least for me, the best setup:\n",
    "\n",
    "  By putting in a blank topic name, all output topics are labeled with the name of their log file. For instance, stuff that goes to dns.log is mapped to the 'dns' Kafka topic, http.log to the 'http' topic, and so on. This was exactly what I wanted.\n",
    "\n",
    "\n",
    "## Start Kafka\n",
    "- Linux: <https://kafka.apache.org/quickstart#quickstart_startserver>\n",
    "- Mac: If you installed with Brew it's running as a service\n",
    "\n",
    "## Run Zeek\n",
    "```\n",
    "$ zeek -i en0 <path to>/local.zeek\n",
    "or \n",
    "$ zeekctl deploy\n",
    "```\n",
    "\n",
    "## Verify messages are in the queue\n",
    "```\n",
    "$ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns\n",
    "```\n",
    "**After a second or two.. you should start seeing DNS requests/replies coming out.. hit Ctrl-C after you see some.**\n",
    "```\n",
    "{\"ts\":1503513688.232274,\"uid\":\"CdA64S2Z6Xh555\",\"id.orig_h\":\"192.168.1.7\",\"id.orig_p\":58528,\"id.resp_h\":\"192.168.1.1\",\"id.resp_p\":53,\"proto\":\"udp\",\"trans_id\":43933,\"rtt\":0.02226,\"query\":\"brian.wylie.is.awesome.tk\",\"qclass\":1,\"qclass_name\":\"C_INTERNET\",\"qtype\":1,\"qtype_name\":\"A\",\"rcode\":0,\"rcode_name\":\"NOERROR\",\"AA\":false,\"TC\":false,\"RD\":true,\"RA\":true,\"Z\":0,\"answers\":[\"17.188.137.55\",\"17.188.142.54\",\"17.188.138.55\",\"17.188.141.184\",\"17.188.129.50\",\"17.188.128.178\",\"17.188.129.178\",\"17.188.141.56\"],\"TTLs\":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],\"rejected\":false}\n",
    "```\n",
    "# If you made it this far you are done!\n",
    "<div style=\"float: left; margin: 20px 20px 20px 20px\"><img src=\"images/whew.jpg\" width=\"300px\"></div>\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Okay so now that the setup is done lets put together a bit of code to\n",
    "# process the Kafka 'topics' that are now being streamed from our Zeek instance\n",
    "\n",
    "# First we create a Kafka Consumer\n",
    "import json\n",
    "from kafka import KafkaConsumer\n",
    "consumer = KafkaConsumer('dns', bootstrap_servers=['localhost:9092'],\n",
    "     value_deserializer=lambda x: json.loads(x.decode('utf-8')))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Now lets process our Kafka Messages\n",
    "for message in consumer:\n",
    "    print(message.value)\n",
    "\n",
    "# Note: This will just loop forever, but here's an \n",
    "# example of the types of output you'll see\n",
    "{'ts': 1570120289.692109, 'uid': 'CAdnHRVdI94Upoej7', 'id.orig_h': '192.168.1.7', '...\n",
    "{'ts': 1570120295.655344, 'uid': 'Ctcv6F2bLT8fB9GOUb', 'id.orig_h': '192.168.1.5', ...\n",
    "{'ts': 1570120295.663177, 'uid': 'CLrohRNbVWuBecKud', 'id.orig_h': '192.168.1.2', '...\n",
    "{'ts': 1570120295.765735, 'uid': 'CxhnkA3sMdZcQJ6vf7', 'id.orig_h': '192.168.1.7', '...\n",
    "{'ts': 1570120295.765745, 'uid': 'CEPF9E4a9WeM1cFlSk', 'id.orig_h': 'fe80::4b8:c380:5a7..."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div style=\"float: right; margin: 20px 20px 20px 20px\"><img src=\"images/dynamic.jpg\" width=\"300px\"></div>\n",
    "\n",
    "## Now What?\n",
    "Okay so now we can actually do something useful with our new streaming data, in this case we're going to use some results from our 'Risky Domains' Notebook that computed a risky set of TLDs.\n",
    "- [Risky Domain Stats](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/master/notebooks/Risky_Domains.ipynb)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Using public VT API Key: Please set apikey=<your key> when creating this class\n"
     ]
    }
   ],
   "source": [
    "from pprint import pprint\n",
    "import tldextract\n",
    "from zat.utils import vt_query\n",
    "\n",
    "# Create a VirusTotal Query Class\n",
    "vtq = vt_query.VTQuery()\n",
    "risky_tlds = set(['info', 'tk', 'xyz', 'online', 'club', 'ru', 'website', \n",
    "                  'in', 'ws', 'top', 'site', 'work', 'biz', 'name', 'tech'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "OMG the Network is on Fire!!!\n",
      "{'filescan_id': None,\n",
      " 'positives': 2,\n",
      " 'query': 'uni10.tk',\n",
      " 'scan_date': '2019-05-29 09:03:43',\n",
      " 'scan_results': [('clean site', 59),\n",
      "                  ('unrated site', 8),\n",
      "                  ('malware site', 1),\n",
      "                  ('suspicious site', 1),\n",
      "                  ('malicious site', 1)],\n",
      " 'total': 70,\n",
      " 'url': 'http://uni10.tk/'}\n"
     ]
    }
   ],
   "source": [
    "# Now lets process our Kafka 'dns' Messages\n",
    "for message in consumer:\n",
    "    dns_message = message.value\n",
    "\n",
    "    # Pull out the TLD\n",
    "    query = dns_message['query']\n",
    "    tld = tldextract.extract(query).suffix\n",
    "\n",
    "    # Check if the TLD is in the risky group\n",
    "    if tld in risky_tlds:\n",
    "        # Make the query with the full query\n",
    "        results = vtq.query_url(query)\n",
    "        if results.get('positives'):\n",
    "            print('\\nOMG the Network is on Fire!!!')\n",
    "            pprint(results)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Part 1: Streaming data pipeline\n",
    "Recall that our long term plan is to build out a streaming data pipeline. This notebook has helped you get started on this path. After completing this notebook you can look at the next steps by viewing our notebooks that use Spark on Zeek output.\n",
    " - [Zeek to Spark](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/master/notebooks/Zeek_to_Spark.ipynb)\n",
    " - [Zeek to Kafka to Spark](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/master/notebooks/Zeek_to_Kafka_to_Spark.ipynb)\n",
    "\n",
    "\n",
    "<div style=\"margin: 20px 20px 20px 20px\"><img src=\"images/pipeline.png\" width=\"750px\"></div>\n",
    "\n",
    "\n",
    "<img align=\"right\" style=\"padding:20px\" src=\"images/SCP_med.png\" width=\"150\">\n",
    "\n",
    "## Wrap Up\n",
    "Well that's it for this notebook, we setup Zeek with the Kafka plugin and showed a simple use of how we might process the streaming data coming from Kafka.\n",
    "\n",
    "### Software\n",
    "- Zeek Network Monitor: https://www.zeek.org\n",
    "- Kafka Zeek Plugin: https://github.com/apache/metron-bro-plugin-kafka\n",
    "- Kafka: https://kafka.apache.org\n",
    "\n",
    "## About SuperCowPowers\n",
    "The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. <a href=\"https://www.supercowpowers.com\" target=\"_blank\">Visit SuperCowPowers</a>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
